package org.budo.warehouse.logic.consumer.mail;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Resource;

import org.apache.commons.mail.SimpleEmail;
import org.budo.graph.annotation.SpringGraph;
import org.budo.support.dao.page.Page;
import org.budo.support.javax.sql.util.JdbcUtil;
import org.budo.support.servlet.util.QueryStringUtil;
import org.budo.time.Time;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.consumer.AbstractDataConsumer;
import org.budo.warehouse.logic.consumer.DataEntryPojo;
import org.budo.warehouse.logic.producer.DataMessageImpl;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IMailBufferService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.MailBuffer;
import org.budo.warehouse.service.entity.Pipeline;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

/**
 * @author lmw
 */
@Slf4j
public class MailDataConsumer extends AbstractDataConsumer {
    private static final String BUFFER_SIZE_KEY = "bufferSize";

    private static final String MAX_DELAY_KEY = "maxDelay";

    private static final int MAX_DELAY_DEFAULT = 5;

    private static final int BUFFER_SIZE_DEFAULT = 2;

    @Resource
    private IMailBufferService mailBufferService;

    @Resource
    private IDataNodeService dataNodeService;

    @SpringGraph
    @Override
    public void consume(DataMessage dataMessage) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();
        for (DataEntry dataEntry : dataEntries) {
            if (dataEntry.getTableName().equals("t_mail_buffer") && dataEntry.getEventType().equals("INSERT")) {
                log.info("#36 dataEntry=" + dataEntry); // 避免造成递归
                continue;
            }

            dataEntryPojos.add(new DataEntryPojo(dataEntry));
        }

        if (dataEntryPojos.isEmpty()) {
            log.warn("#44 dataEntryPojos=" + dataEntryPojos + ", dataMessage=" + dataMessage);
            return;
        }

        DataMessage dataMessagePojo = new DataMessageImpl(dataMessage.getDataNodeId(), dataEntryPojos);
        String dataMessageJson = JSON.toJSONString(dataMessagePojo);

        Integer pipelineId = this.getPipeline().getId();
        MailBuffer mailBuffer = new MailBuffer(pipelineId, dataMessageJson);
        mailBufferService.insert(mailBuffer);

        Integer targetDataNodeId = this.getPipeline().getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);

        // 默认2条一批
        Integer bufferSize = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
        Integer count = mailBufferService.countNotSentByPipelineId(pipelineId);
        if (count >= bufferSize) {
            log.info("#72 bufferSize=" + bufferSize + ", count=" + count + ", pipeline=" + this.getPipeline() + ", targetDataNode=" + targetDataNode);
            this.sendMail(this.getPipeline());
            return;
        }

        // 默认5秒延迟
        Integer maxDelay = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), MAX_DELAY_KEY, MAX_DELAY_DEFAULT);
        Timestamp maxCreatedAt = mailBufferService.findNotSentMaxCreatedAtByPipelineId(pipelineId);
        if (Time.now().isAfter(Time.when(maxCreatedAt).plusSecond(maxDelay))) {
            log.info("#81 maxDelay=" + maxDelay + ", maxCreatedAt=" + maxCreatedAt + ", pipeline=" + this.getPipeline() + ", targetDataNode=" + targetDataNode);
            this.sendMail(this.getPipeline());
        }
    }

    private void sendMail(Pipeline pipeline) {
        try {
            DataNode targetDataNode = dataNodeService.findById(pipeline.getTargetDataNodeId());

            List<MailBuffer> mailBuffers = mailBufferService.listNotSentByPipelineId(pipeline.getId(), Page.max());
            log.info("#69 mailBuffers=" + mailBuffers + ", pipeline=" + pipeline);

            // mail:smtp://smtp.qq.com:465?bufferSize=5&maxDelay=5
            SimpleEmail simpleEmail = new SimpleEmail();
            simpleEmail.setHostName(JdbcUtil.getHost(targetDataNode.getUrl()));
            simpleEmail.setAuthentication(targetDataNode.getUsername(), targetDataNode.getPassword());

            simpleEmail.addTo(pipeline.getTargetTable()) //
                    .setSubject(pipeline.getTargetSchema())//
                    .setMsg(JSON.toJSONString(mailBuffers)) //
                    .send();

            mailBufferService.updateSentAtByPipelineId(Time.now().toTimestamp(), pipeline.getId());
        } catch (Throwable e) {
            log.error("#111 pipeline=" + pipeline + ", e=" + e, e);
        }
    }
}